package com.whut.monitor.util;

import com.jcraft.jsch.*;
import com.whut.monitor.websocket.PollWebSocketServlet;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * 远程登录SSH执行指令
 *
 * @author Sandeepin
 *         2017/11/30 0030
 */
public class SshUtil {

    private static final int DEFAULT_SSH_PORT = 22;

    private String ipAddress;
    private String username;
    private String password;

    public SshUtil(final String ipAddress, final String username, final String password) {
        this.ipAddress = ipAddress;
        this.username = username;
        this.password = password;
    }

    // 执行命令
    public int execute(final String command) {
        int returnCode = 0;
        JSch jsch = new JSch();
        MyUserInfo userInfo = new MyUserInfo();

        try {
            // 创建连接 session.
            Session session = jsch.getSession(username, ipAddress, DEFAULT_SSH_PORT);
            session.setPassword(password);
            session.setUserInfo(userInfo);
            session.connect();

            // 创建和连接 channel.
            Channel channel = session.openChannel("exec");
            ((ChannelExec) channel).setCommand(command);

            channel.setInputStream(null);
            BufferedReader input = new BufferedReader(new InputStreamReader(channel
                    .getInputStream()));

            channel.connect();
            System.out.println("输入的执行指令为：" + command);

            // 从远程控制台捕获输出
            String line;
            while ((line = input.readLine()) != null) {
                System.out.println("远程：" + line);
            }
            input.close();

            // Get the return code only after the channel is closed.
            if (channel.isClosed()) {
                returnCode = channel.getExitStatus();
            }

            // Disconnect the channel and session.
            channel.disconnect();
            session.disconnect();
        } catch (JSchException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnCode;
    }

    // 执行命令
    public int execute(final String command, String md5value, PollWebSocketServlet webSocketServlet) {
        int returnCode = 0;
        JSch jsch = new JSch();
        MyUserInfo userInfo = new MyUserInfo();

        try {
            // 创建连接 session.
            Session session = jsch.getSession(username, ipAddress, DEFAULT_SSH_PORT);
            session.setPassword(password);
            session.setUserInfo(userInfo);
            session.connect();

            // 创建和连接 channel.
            Channel channel = session.openChannel("exec");
            ((ChannelExec) channel).setCommand(command);

            channel.setInputStream(null);
            BufferedReader input = new BufferedReader(new InputStreamReader(channel
                    .getInputStream()));

            channel.connect();
            System.out.println("输入的执行指令为：" + command);

            // 从远程控制台捕获输出
            String line;
            while ((line = input.readLine()) != null) {
                System.out.println("远程：" + line);
                SendMessageUtil.sendMessage(md5value, line, webSocketServlet);
            }
            input.close();

            // Get the return code only after the channel is closed.
            if (channel.isClosed()) {
                returnCode = channel.getExitStatus();
            }

            // Disconnect the channel and session.
            channel.disconnect();
            session.disconnect();
        } catch (JSchException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnCode;
    }

    public static String sparkCompute(String filePath, String fileName, String md5value, String email, PollWebSocketServlet webSocketServlet) {
        System.out.println("国花、异想家spark远程计算！开始...");

        // 本机执行指令
        List<String> cmdsList = new ArrayList<>();
        cmdsList.add("scp " + filePath + fileName + " root@Master.Hadoop:/root/compute");
        for (String aList : cmdsList) {
            try {
                SendMessageUtil.sendMessage(md5value, "Spark远程计算开始...", webSocketServlet);
                System.out.println("开始散发文件到分布式节点...");
                SendMessageUtil.sendMessage(md5value, "散发文件到分布式节点...", webSocketServlet);
                Process pr = Runtime.getRuntime().exec(aList);
                InputStream inputStream = pr.getErrorStream();
                BufferedReader in = new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line = in.readLine()) != null) {
                    System.out.println(line);
                    SendMessageUtil.sendMessage(md5value, line, webSocketServlet);
                }
                inputStream.close();
                in.close();
                pr.waitFor();
                pr.destroy();
                System.out.println("文件传输完成");
                SendMessageUtil.sendMessage(md5value, "文件传输完成...", webSocketServlet);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // 登录ssh执行spark
        try {
            System.out.println("开始执行spark计算...");
            SendMessageUtil.sendMessage(md5value, "开始执行spark计算...", webSocketServlet);
            SshUtil sshExecutor = new SshUtil("192.168.1.100", "root", "root");
            String cmds = "cd /root/compute\n" +
                    "mkdir " + md5value + "\n" +
                    "mv " + fileName + " " + md5value + "\n" +
                    "cd " + md5value + "\n" +
                    "mkdir model\n" +
                    "unzip " + fileName + "\n" +
                    "cd /home/spark/bin \n" +
                    "./spark-submit --master spark://192.168.1.100:6066 /root/compute/" + md5value + "/run.py\n" +
                    "cd /root/compute/" + md5value + "\n" +
                    "zip -r model.zip model\n" +
                    "scp /root/compute/" + md5value + "/model.zip root@192.168.1.200:/home/whut/soft/tomcat/webapps/ROOT/file/compute/" + md5value;
            int exitStatus = sshExecutor.execute(cmds, md5value, webSocketServlet);
            System.out.println("exitStatus：" + exitStatus);
            SendMessageUtil.sendMessage(md5value, "Spark远程计算完成！状态：" + exitStatus, webSocketServlet);
            String modelDownloadLink = String.valueOf(exitStatus) + "#http://59.69.101.206/file/compute/" + md5value + "/model.zip";
            System.out.println("spark计算完毕，下载地址：\n" + modelDownloadLink);
            System.out.println("国花、异想家spark远程计算完成！");
            return modelDownloadLink;
        } catch (Exception e) {
            e.printStackTrace();
            return "-2#Error";
        }
    }

    public static void main(final String[] args) {
        System.out.println("国花、异想家spark远程Demo！开始...");

        // 基本信息
        String md5value = "whut1234";
        String fileName = md5value + "_710229096@qq.com.zip";

        // 本机执行指令
        List<String> cmdsList = new ArrayList<String>();
        cmdsList.add("mkdir /home/whut/soft/tomcat/webapps/ROOT/file/compute/" + md5value);
        cmdsList.add("scp /home/whut/" + md5value + "/" + fileName + " root@Master.Hadoop:/root/compute");
        int cnt = 0;
        for (String aList : cmdsList) {
            try {
                if (cnt == 0) {
                    System.out.println("创建文件夹...");
                }
                if (cnt == 1) {
                    System.out.println("开始拷贝文件到Hadoop节点...");
                }
                Process pr = Runtime.getRuntime().exec(aList);
                InputStream inputStream = pr.getErrorStream();
                BufferedReader in = new BufferedReader(new InputStreamReader(inputStream));
                String line;
                while ((line = in.readLine()) != null) {
                    System.out.println(line);
                }
                inputStream.close();
                in.close();
                pr.waitFor();
                pr.destroy();
                if (cnt == 0) {
                    System.out.println("创建文件夹完成");
                }
                if (cnt == 1) {
                    System.out.println("文件传输完成");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            cnt++;
        }

        // 登录ssh执行spark
        System.out.println("开始执行spark计算...");
        SshUtil sshExecutor = new SshUtil("192.168.1.100", "root", "root");
        String cmds = "cd /root/compute\n" +
                "mkdir " + md5value + "\n" +
                "mv " + fileName + " " + md5value + "\n" +
                "cd " + md5value + "\n" +
                "mkdir model\n" +
                "unzip " + fileName + "\n" +
                "cd /home/spark/bin \n" +
                "./spark-submit --master spark://192.168.1.100:6066 /root/compute/" + md5value + "/run.py\n" +
                "cd /root/compute/" + md5value + "\n" +
                "zip -r model.zip model\n" +
                "scp /root/compute/" + md5value + "/model.zip root@192.168.1.200:/home/whut/soft/tomcat/webapps/ROOT/file/compute/" + md5value;
        sshExecutor.execute(cmds);
        System.out.println("spark计算完毕，下载地址：");
        System.out.println("http://59.69.101.206/file/compute/" + md5value + "/model.zip");

        System.out.println("国花、异想家spark远程Demo测试通过！");
    }

    // 重写用户信息内部类
    public class MyUserInfo implements UserInfo {
        private String password;
        private String passphrase;

        @Override
        public String getPassphrase() {
            System.out.println("用户信息：获取密码 ");
            return null;
        }

        @Override
        public String getPassword() {
            System.out.println("用户信息：得到密码 ");
            return null;
        }

        @Override
        public boolean promptPassphrase(final String arg0) {
            System.out.println("用户信息：提示密码 " + arg0);
            return false;
        }

        @Override
        public boolean promptPassword(final String arg0) {
            System.out.println("用户信息：提示密码 " + arg0);
            return false;
        }

        @Override
        public boolean promptYesNo(final String arg0) {
            System.out.println("用户信息：提示YesNo " + arg0);
            if (arg0.contains("The authenticity of host")) {
                return true;
            }
            return false;
        }

        @Override
        public void showMessage(final String arg0) {
            System.out.println("用户信息：显示消息 ");
        }
    }


}
